package com.cube.share.delay.manager;

import com.cube.share.base.utils.JacksonUtils;
import com.cube.share.base.utils.RedisUtils;
import com.cube.share.delay.config.RabbitMqDelayMessageConfig;
import com.cube.share.delay.handler.DelayMessageHandler;
import com.cube.share.delay.message.DelayMessage;
import com.cube.share.delay.message.DelayMessageType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static com.cube.share.base.utils.SpringContextUtil.getBean;

/**
 * @author cube.li
 * @date 2021/9/22 16:03
 * @description 基于rabbitmq实现的延时消息管理器
 */
@Component
@DependsOn({"springContextUtil", "rabbitTemplate"})
@Slf4j
public class RabbitmqDelayMessageManager implements DelayMessageManager {

    private final Map<DelayMessageType, DelayMessageHandler> handlerMap = new ConcurrentHashMap<>(16);

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMqDelayMessageConfig.DELAYED_QUEUE_NAME, ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        String bodyString = new String(message.getBody());
        DelayMessage delayMessage = JacksonUtils.readJsonString(bodyString, DelayMessage.class);
        log.info("接收到延时消息:{}", delayMessage.toString());
        if (delayMessage.getSerialId() != null) {
            final String key = generateKey(delayMessage);
            if (!RedisUtils.hasKey(key)) {
                log.warn("message has already removed from delay queue");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
        }
        try {
            handlerMap.get(delayMessage.getType()).handle(delayMessage);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

    @Override
    public void add(DelayMessage message) {
        message.check();
        if (message.getSerialId() != null) {
            String key = generateKey(message);
            RedisUtils.set(key, 1, getKeyExpire(message.getProperties(), 1000));
        }
        rabbitTemplate.convertAndSend(RabbitMqDelayMessageConfig.DELAYED_EXCHANGE_NAME, RabbitMqDelayMessageConfig.DELAYED_ROUTING_KEY, message, msg -> {
            MessageProperties messageProperties = msg.getMessageProperties();
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            long expiration = TimeUnit.MILLISECONDS.convert(message.getProperties().getExpire(), message.getProperties().getTimeUnit());
            messageProperties.setDelay((int) expiration);
            return msg;
        });
    }

    @Override
    public boolean remove(DelayMessage message) {
        final String key = generateKey(message);
        if (RedisUtils.hasKey(key)) {
            return RedisUtils.delete(key);
        }
        return false;
    }

    @Override
    public void destroy() {
        //do nothing
    }

    @Override
    public void afterPropertiesSet() {
        Arrays.stream(DelayMessageType.values()).forEach(delayMessageType -> handlerMap.put(delayMessageType, getBean(delayMessageType.getHandler())));
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    }

    private String generateKey(DelayMessage message) {
        return message.getType().toString() + ":" + message.getSerialId();
    }

    private long getKeyExpire(DelayMessage.DelayMessageProperties properties, @SuppressWarnings("SameParameterValue") long offset) {
        return TimeUnit.MILLISECONDS.convert(properties.getExpire(), properties.getTimeUnit()) + offset;
    }
}
